use crate::{bsatn, sys, DeserializeOwned, IterBuf, Serialize, SpacetimeType, TableId};
use core::borrow::Borrow;
use core::convert::Infallible;
use core::fmt;
use core::marker::PhantomData;
pub use spacetimedb_lib::db::raw_def::v9::TableAccess;
use spacetimedb_lib::{
    buffer::{BufReader, Cursor, DecodeError},
    AlgebraicValue,
};
use spacetimedb_lib::{FilterableValue, IndexScanRangeBoundsTerminator};
pub use spacetimedb_primitives::{ColId, IndexId};

/// Implemented for every `TableHandle` struct generated by the [`table`](macro@crate::table) macro.
/// Contains methods that are present for every table, regardless of what unique constraints
/// and indexes are present.
///
/// To get a `TableHandle`
// TODO: should we rename this `TableHandle`? Documenting this, I think that's much clearer.
pub trait Table: TableInternal {
    /// The type of rows stored in this table.
    type Row: SpacetimeType + Serialize + DeserializeOwned + Sized + 'static;

    /// Returns the number of rows of this table.
    ///
    /// This takes into account modifications by the current transaction,
    /// even though those modifications have not yet been committed or broadcast to clients.
    /// This applies generally to insertions, deletions, updates, and iteration as well.
    fn count(&self) -> u64 {
        sys::datastore_table_row_count(Self::table_id()).expect("datastore_table_row_count() call failed")
    }

    /// Iterate over all rows of the table.
    ///
    /// For large tables, this can be a slow operation!
    /// Prefer [filtering](RangedIndex::filter) a [`RangedIndex`] or [finding](UniqueColumn::find) a [`UniqueColumn`] if
    /// possible.
    ///
    /// (This keeps track of changes made to the table since the start of this reducer invocation. For example, if rows have been deleted since the start of this reducer invocation, those rows will not be returned by `iter`. Similarly, inserted rows WILL be returned.)
    #[inline]
    fn iter(&self) -> impl Iterator<Item = Self::Row> {
        let table_id = Self::table_id();
        let iter = sys::datastore_table_scan_bsatn(table_id).expect("datastore_table_scan_bsatn() call failed");
        TableIter::new(iter)
    }

    /// Inserts `row` into the table.
    ///
    /// The return value is the inserted row, with any auto-incrementing columns replaced with computed values.
    /// The `insert` method always returns the inserted row,
    /// even when the table contains no auto-incrementing columns.
    ///
    /// (The returned row is a copy of the row in the database.
    /// Modifying this copy does not directly modify the database.
    /// See [`UniqueColumn::update`] if you want to update the row.)
    ///
    /// May panic if inserting the row violates any constraints.
    /// Callers which intend to handle constraint violation errors should instead use [`Self::try_insert`].
    ///
    /// Inserting an exact duplicate of a row already present in the table is a no-op,
    /// as SpacetimeDB is a set-semantic database.
    /// This is true even for tables with unique constraints;
    /// inserting an exact duplicate of an already-present row will not panic.
    #[track_caller]
    fn insert(&self, row: Self::Row) -> Self::Row {
        self.try_insert(row).unwrap_or_else(|e| panic!("{e}"))
    }

    /// The error type for this table for unique constraint violations. Will either be
    /// [`UniqueConstraintViolation`] if the table has any unique constraints, or [`Infallible`]
    /// otherwise.
    type UniqueConstraintViolation: MaybeError<UniqueConstraintViolation>;

    /// The error type for this table for auto-increment overflows. Will either be
    /// [`AutoIncOverflow`] if the table has any auto-incrementing columns, or [`Infallible`]
    /// otherwise.
    type AutoIncOverflow: MaybeError<AutoIncOverflow>;

    /// Counterpart to [`Self::insert`] which allows handling failed insertions.
    ///
    /// For tables with constraints, this method returns an `Err` when the insertion fails rather than panicking.
    /// For tables without any constraints, [`Self::UniqueConstraintViolation`] and [`Self::AutoIncOverflow`]
    /// will be [`std::convert::Infallible`], and this will be a more-verbose [`Self::insert`].
    ///
    /// Inserting an exact duplicate of a row already present in the table is a no-op and returns `Ok`,
    /// as SpacetimeDB is a set-semantic database.
    /// This is true even for tables with unique constraints;
    /// inserting an exact duplicate of an already-present row will return `Ok`.
    #[track_caller]
    fn try_insert(&self, row: Self::Row) -> Result<Self::Row, TryInsertError<Self>> {
        insert::<Self>(row, IterBuf::take())
    }

    /// Deletes a row equal to `row` from the table.
    ///
    /// Returns `true` if the row was present and has been deleted,
    /// or `false` if the row was not present and therefore the tables have not changed.
    ///
    /// Unlike [`Self::insert`], there is no need to return the deleted row,
    /// as it must necessarily have been exactly equal to the `row` argument.
    /// No analogue to auto-increment placeholders exists for deletions.
    ///
    /// May panic if deleting the row violates any constraints.
    fn delete(&self, row: Self::Row) -> bool {
        // Note that as of writing deletion is infallible, but future work may define new constraints,
        // e.g. foreign keys, which cause deletion to fail in some cases.
        // If and when these new constraints are added,
        // we should define `Self::ForeignKeyViolation`,
        // analogous to [`Self::UniqueConstraintViolation`].

        let relation = std::slice::from_ref(&row);
        let buf = IterBuf::serialize(relation).unwrap();
        let count = sys::datastore_delete_all_by_eq_bsatn(Self::table_id(), &buf).unwrap();
        count > 0
    }

    // Re-integrates the BSATN of the `generated_cols` into `row`.
    #[doc(hidden)]
    fn integrate_generated_columns(row: &mut Self::Row, generated_cols: &[u8]);
}

#[doc(hidden)]
pub trait TableInternal: Sized {
    const TABLE_NAME: &'static str;
    const TABLE_ACCESS: TableAccess = TableAccess::Private;
    const UNIQUE_COLUMNS: &'static [u16];
    const INDEXES: &'static [IndexDesc<'static>];
    const PRIMARY_KEY: Option<u16> = None;
    const SEQUENCES: &'static [u16];
    const SCHEDULE: Option<ScheduleDesc<'static>> = None;

    /// Returns the ID of this table.
    fn table_id() -> TableId;

    fn get_default_col_values() -> Vec<ColumnDefault>;
}

/// Describe a named index with an index type over a set of columns identified by their IDs.
#[derive(Clone, Copy)]
pub struct IndexDesc<'a> {
    pub accessor_name: &'a str,
    pub algo: IndexAlgo<'a>,
}

#[derive(Clone, Copy)]
pub enum IndexAlgo<'a> {
    BTree { columns: &'a [u16] },
    Direct { column: u16 },
}

pub struct ScheduleDesc<'a> {
    pub reducer_or_procedure_name: &'a str,
    pub scheduled_at_column: u16,
}

#[derive(Debug, Clone)]
pub struct ColumnDefault {
    pub col_id: u16,
    pub value: AlgebraicValue,
}

/// A row operation was attempted that would violate a unique constraint.
// TODO: add column name for better error message
#[derive(Debug)]
#[non_exhaustive]
pub struct UniqueConstraintViolation;

impl fmt::Display for UniqueConstraintViolation {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "duplicate unique column")
    }
}

impl std::error::Error for UniqueConstraintViolation {}

/// An auto-inc column overflowed its data type.
#[derive(Debug)]
#[non_exhaustive]
// TODO: add column name for better error message
pub struct AutoIncOverflow;

impl fmt::Display for AutoIncOverflow {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "auto-inc sequence overflowed its column type")
    }
}

impl std::error::Error for AutoIncOverflow {}

/// The error type returned from [`Table::try_insert()`], signalling a constraint violation.
pub enum TryInsertError<Tbl: Table> {
    /// A [`UniqueConstraintViolation`].
    ///
    /// Returned from [`Table::try_insert`] if an attempted insertion
    /// has the same value in a unique column as an already-present row.
    ///
    /// This variant is only possible if the table has at least one unique column,
    /// and is otherwise [`std::convert::Infallible`].
    UniqueConstraintViolation(Tbl::UniqueConstraintViolation),

    /// An [`AutoIncOverflow`].
    ///
    /// Returned from [`Table::try_insert`] if an attempted insertion
    /// advances an auto-inc sequence past the bounds of the column type.
    ///
    /// This variant is only possible if the table has at least one auto-inc column,
    /// and is otherwise [`std::convert::Infallible`].
    AutoIncOverflow(Tbl::AutoIncOverflow),
}

impl<Tbl: Table> fmt::Debug for TryInsertError<Tbl> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "TryInsertError::<{}>::", Tbl::TABLE_NAME)?;
        match self {
            Self::UniqueConstraintViolation(e) => fmt::Debug::fmt(e, f),
            Self::AutoIncOverflow(e) => fmt::Debug::fmt(e, f),
        }
    }
}

impl<Tbl: Table> fmt::Display for TryInsertError<Tbl> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "insertion error on table `{}`:", Tbl::TABLE_NAME)?;
        match self {
            Self::UniqueConstraintViolation(e) => fmt::Display::fmt(e, f),
            Self::AutoIncOverflow(e) => fmt::Display::fmt(e, f),
        }
    }
}

impl<Tbl: Table> std::error::Error for TryInsertError<Tbl> {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        Some(match self {
            Self::UniqueConstraintViolation(e) => e,
            Self::AutoIncOverflow(e) => e,
        })
    }
}

impl<Tbl: Table> From<TryInsertError<Tbl>> for String {
    fn from(err: TryInsertError<Tbl>) -> Self {
        err.to_string()
    }
}

#[doc(hidden)]
pub trait MaybeError<E = Self>: std::error::Error + Send + Sync + Sized + 'static {
    fn get() -> Option<Self>;
}

impl<E> MaybeError<E> for Infallible {
    fn get() -> Option<Self> {
        None
    }
}

impl MaybeError for UniqueConstraintViolation {
    fn get() -> Option<Self> {
        Some(UniqueConstraintViolation)
    }
}

impl MaybeError for AutoIncOverflow {
    fn get() -> Option<AutoIncOverflow> {
        Some(AutoIncOverflow)
    }
}

pub trait Column {
    type Table: Table;
    type ColType: SpacetimeType + Serialize + DeserializeOwned;
    const COLUMN_NAME: &'static str;
    fn get_field(row: &<Self::Table as Table>::Row) -> &Self::ColType;
}

/// A handle to a unique index on a column.
/// Available for `#[unique]` and `#[primary_key]` columns.
///
/// For a table *table* with a column *column*, use `ctx.db.{table}().{column}()`
/// to get a `UniqueColumn` from a [`ReducerContext`](crate::ReducerContext).
///
/// Example:
///
/// ```no_run
/// # #[cfg(target_arch = "wasm32")] mod demo {
/// use spacetimedb::{table, UniqueColumn, ReducerContext, DbContext};
///
/// #[table(name = user)]
/// struct User {
///     #[primary_key]
///     id: u32,
///     #[unique]
///     username: String,
///     dog_count: u64
/// }
///
/// fn demo(ctx: &ReducerContext) {
///     let user = ctx.db().user();
///
///     let by_id: UniqueColumn<_, u32, _> = user.id();
///
///     let mut example_user: User = by_id.find(357).unwrap();
///     example_user.dog_count += 5;
///     by_id.update(example_user);
///
///     let by_username: UniqueColumn<_, String, _> = user.username();
///     by_username.delete(&"Evil Bob".to_string());
/// }
/// # }
/// ```
///
/// <!-- TODO: do we need integer type suffixes on literal arguments, like for RangedIndex? -->
pub struct UniqueColumn<Tbl, ColType, Col> {
    _marker: PhantomData<(Tbl, ColType, Col)>,
}

impl<Tbl: Table, Col: Index + Column<Table = Tbl>> UniqueColumn<Tbl, Col::ColType, Col> {
    #[doc(hidden)]
    pub const __NEW: Self = Self { _marker: PhantomData };

    /// Finds and returns the row where the value in the unique column matches the supplied `col_val`,
    /// or `None` if no such row is present in the database state.
    //
    // TODO: consider whether we should accept the sought value by ref or by value.
    // Should be consistent with the implementors of `IndexScanRangeBounds` (see below).
    // By-value makes passing `Copy` fields more convenient,
    // whereas by-ref makes passing `!Copy` fields more performant.
    // Can we do something smart with `std::borrow::Borrow`?
    #[inline]
    pub fn find(&self, col_val: impl Borrow<Col::ColType>) -> Option<Tbl::Row>
    where
        for<'a> &'a Col::ColType: FilterableValue,
    {
        find::<Tbl, Col>(col_val.borrow())
    }

    /// Deletes the row where the value in the unique column matches the supplied `col_val`,
    /// if any such row is present in the database state.
    ///
    /// Returns `true` if a row with the specified `col_val` was previously present and has been deleted,
    /// or `false` if no such row was present.
    #[inline]
    pub fn delete(&self, col_val: impl Borrow<Col::ColType>) -> bool {
        self._delete(col_val.borrow()).0
    }

    fn _delete(&self, col_val: &Col::ColType) -> (bool, IterBuf) {
        let index_id = Col::index_id();
        let args = get_args::<Tbl, Col>(col_val);
        let (prefix, prefix_elems, rstart, rend) = args.args_for_syscall();

        let n_del = sys::datastore_delete_by_index_scan_range_bsatn(index_id, prefix, prefix_elems, rstart, rend)
            .unwrap_or_else(|e| {
                panic!("unique: unexpected error from datastore_delete_by_index_scan_range_bsatn: {e}")
            });

        (n_del > 0, args.data)
    }

    /// Deletes the row where the value in the unique column matches that in the corresponding field of `new_row`, and
    /// then inserts the `new_row`.
    ///
    /// Returns the new row as actually inserted, with  computed values substituted for any auto-inc placeholders.
    ///
    /// # Panics
    /// Panics if no row was previously present with the matching value in the unique column,
    /// or if either the delete or the insertion would violate a constraint.
    #[track_caller]
    pub fn update(&self, new_row: Tbl::Row) -> Tbl::Row {
        let buf = IterBuf::take();
        update::<Tbl>(Col::index_id(), new_row, buf)
    }

    /// Inserts `new_row` into the table, first checking for an existing
    /// row with a matching value in the unique column and deleting it if present.
    ///
    /// Be careful: in case of a constraint violation, this method will return Err,
    /// but the previous row will be deleted. If you propagate the error, SpacetimeDB will
    /// rollback the transaction and the old row will be restored. If you ignore the error,
    /// the old row will be lost.
    #[track_caller]
    #[doc(alias = "try_upsert")]
    #[cfg(feature = "unstable")]
    pub fn try_insert_or_update(&self, new_row: Tbl::Row) -> Result<Tbl::Row, TryInsertError<Tbl>> {
        let col_val = Col::get_field(&new_row);
        // If the row doesn't exist, delete will return false, which we ignore.
        let _ = self.delete(col_val);

        // Then, insert the new row.
        let buf = IterBuf::take();
        insert::<Tbl>(new_row, buf)
    }

    /// Inserts `new_row` into the table, first checking for an existing
    /// row with a matching value in the unique column and deleting it if present.
    ///
    /// # Panics
    /// Panics if either the delete or the insertion would violate a constraint.
    #[track_caller]
    #[doc(alias = "upsert")]
    #[cfg(feature = "unstable")]
    pub fn insert_or_update(&self, new_row: Tbl::Row) -> Tbl::Row {
        self.try_insert_or_update(new_row).unwrap_or_else(|e| panic!("{e}"))
    }
}

#[inline]
fn get_args<Tbl: Table, Col: Index + Column<Table = Tbl>>(col_val: &Col::ColType) -> IndexScanRangeArgs {
    IndexScanRangeArgs {
        data: IterBuf::serialize(&std::ops::Bound::Included(col_val)).unwrap(),
        prefix_elems: 0,
        rstart_idx: 0,
        rend_idx: None,
    }
}

#[inline]
fn find<Tbl: Table, Col: Index + Column<Table = Tbl>>(col_val: &Col::ColType) -> Option<Tbl::Row> {
    // Find the row with a match.
    let index_id = Col::index_id();
    let args = get_args::<Tbl, Col>(col_val);
    let (prefix, prefix_elems, rstart, rend) = args.args_for_syscall();

    let iter = sys::datastore_index_scan_range_bsatn(index_id, prefix, prefix_elems, rstart, rend)
        .unwrap_or_else(|e| panic!("unique: unexpected error from `datastore_index_scan_range_bsatn`: {e}"));
    let mut iter = TableIter::new_with_buf(iter, args.data);

    // We will always find either 0 or 1 rows here due to the unique constraint.
    let row = iter.next();
    assert!(
        iter.is_exhausted(),
        "`datastore_index_scan_range_bsatn` on unique field cannot return >1 rows"
    );
    row
}

/// A read-only handle to a unique (single-column) index.
///
/// This is the read-only version of [`UniqueColumn`].
/// It mirrors [`UniqueColumn`] but only exposes read APIs.
/// It cannot insert or delete rows.
/// It is used by `{table}__ViewHandle` to keep view code read-only at compile time.
///
/// Note, the `Tbl` generic is the read-write table handle `{table}__TableHandle`.
/// This is because read-only indexes still need [`Table`] metadata.
/// The view handle itself deliberately does not implement `Table`.
pub struct UniqueColumnReadOnly<Tbl, ColType, Col> {
    _marker: PhantomData<(Tbl, ColType, Col)>,
}

impl<Tbl: Table, Col: Index + Column<Table = Tbl>> UniqueColumnReadOnly<Tbl, Col::ColType, Col> {
    #[doc(hidden)]
    pub const __NEW: Self = Self { _marker: PhantomData };

    #[inline]
    pub fn find(&self, col_val: impl Borrow<Col::ColType>) -> Option<Tbl::Row>
    where
        for<'a> &'a Col::ColType: FilterableValue,
    {
        find::<Tbl, Col>(col_val.borrow())
    }
}

pub trait Index {
    fn index_id() -> IndexId;
}

/// A handle to a B-Tree index on a table.
///
/// To get one of these from a `ReducerContext`, use:
/// ```text
/// ctx.db.{table}().{index}()
/// ```
/// for a table *table* and an index *index*.
///
/// Example:
///
/// ```no_run
/// # #[cfg(target_arch = "wasm32")] mod demo {
/// use spacetimedb::{table, RangedIndex, ReducerContext, DbContext};
///
/// #[table(name = user,
///     index(name = dogs_and_name, btree(columns = [dogs, name])))]
/// struct User {
///     id: u32,
///     name: String,
///     /// Number of dogs owned by the user.
///     dogs: u64
/// }
///
/// fn demo(ctx: &ReducerContext) {
///     let by_dogs_and_name: RangedIndex<_, (u64, String), _> = ctx.db.user().dogs_and_name();
/// }
/// # }
/// ```
///
/// For single-column indexes, use the name of the column:
///
/// ```no_run
/// # #[cfg(target_arch = "wasm32")] mod demo {
/// use spacetimedb::{table, RangedIndex, ReducerContext, DbContext};
///
/// #[table(name = user)]
/// struct User {
///     id: u32,
///     username: String,
///     #[index(btree)]
///     dogs: u64
/// }
///
/// fn demo(ctx: &ReducerContext) {
///     let by_dogs: RangedIndex<_, (u64,), _> = ctx.db().user().dogs();
/// }
/// # }
/// ```
///
pub struct RangedIndex<Tbl: Table, IndexType, Idx: Index> {
    _marker: PhantomData<(Tbl, IndexType, Idx)>,
}

impl<Tbl: Table, IndexType, Idx: Index> RangedIndex<Tbl, IndexType, Idx> {
    #[doc(hidden)]
    pub const __NEW: Self = Self { _marker: PhantomData };

    /// Returns an iterator over all rows in the database state where the indexed column(s) match the bounds `b`.
    ///
    /// This method accepts a variable numbers of arguments using the [`IndexScanRangeBounds`] trait.
    /// This depends on the type of the B-Tree index. `b` may be:
    /// - A value for the first indexed column.
    /// - A range of values for the first indexed column.
    /// - A tuple of values for any prefix of the indexed columns, optionally terminated by a range for the next.
    ///
    /// For example:
    ///
    /// ```no_run
    /// # #[cfg(target_arch = "wasm32")] mod demo {
    /// use spacetimedb::{table, ReducerContext, RangedIndex};
    ///
    /// #[table(name = user,
    ///     index(name = dogs_and_name, btree(columns = [dogs, name])))]
    /// struct User {
    ///     id: u32,
    ///     name: String,
    ///     dogs: u64
    /// }
    ///
    /// fn demo(ctx: &ReducerContext) {
    ///     let by_dogs_and_name: RangedIndex<_, (u64, String), _> = ctx.db.user().dogs_and_name();
    ///
    ///     // Find user with exactly 25 dogs.
    ///     for user in by_dogs_and_name.filter(25u64) { // The `u64` is required, see below.
    ///         /* ... */
    ///     }
    ///
    ///     // Find user with at least 25 dogs.
    ///     for user in by_dogs_and_name.filter(25u64..) {
    ///         /* ... */
    ///     }
    ///
    ///     // Find user with exactly 25 dogs, and a name beginning with "J".
    ///     for user in by_dogs_and_name.filter((25u64, "J".."K")) {
    ///         /* ... */
    ///     }
    ///
    ///     // Find user with exactly 25 dogs, and exactly the name "Joseph".
    ///     for user in by_dogs_and_name.filter((25u64, "Joseph")) {
    ///         /* ... */
    ///     }
    ///
    ///     // You can also pass arguments by reference if desired.
    ///     for user in by_dogs_and_name.filter((&25u64, &"Joseph".to_string())) {
    ///         /* ... */
    ///     }
    /// }
    /// # }
    /// ```
    ///
    /// **NOTE:** An unfortunate interaction between Rust's trait solver and integer literal defaulting rules means that you must specify the types of integer literals passed to `filter` and `find` methods via the suffix syntax, like `21u32`.
    ///
    /// If you don't, you'll see a compiler error like:
    /// > ```text
    /// > error[E0271]: type mismatch resolving `<i32 as FilterableValue>::Column == u32`
    /// >    --> modules/rust-wasm-test/src/lib.rs:356:48
    /// >     |
    /// > 356 |     for person in ctx.db.person().age().filter(21) {
    /// >     |                                         ------ ^^ expected `u32`, found `i32`
    /// >     |                                         |
    /// >     |                                         required by a bound introduced by this call
    /// >     |
    /// >     = note: required for `i32` to implement `IndexScanRangeBounds<(u32,), SingleBound>`
    /// > note: required by a bound in `RangedIndex::<Tbl, IndexType, Idx>::filter`
    /// >     |
    /// > 410 |     pub fn filter<B, K>(&self, b: B) -> impl Iterator<Item = Tbl::Row>
    /// >     |            ------ required by a bound in this associated function
    /// > 411 |     where
    /// > 412 |         B: IndexScanRangeBounds<IndexType, K>,
    /// >     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `RangedIndex::<Tbl, IndexType, Idx>::filter`
    /// > ```
    /// <!-- TODO: check if that error is up to date! -->
    pub fn filter<B, K>(&self, b: B) -> impl Iterator<Item = Tbl::Row>
    where
        B: IndexScanRangeBounds<IndexType, K>,
    {
        filter::<Tbl, Idx, IndexType, B, K>(b)
    }

    /// Deletes all rows in the database state where the indexed column(s) match the bounds `b`.
    ///
    /// This method accepts a variable numbers of arguments using the [`IndexScanRangeBounds`] trait.
    /// This depends on the type of the B-Tree index. `b` may be:
    /// - A value for the first indexed column.
    /// - A range of values for the first indexed column.
    /// - A tuple of values for any prefix of the indexed columns, optionally terminated by a range for the next.
    ///
    /// For example:
    ///
    /// ```no_run
    /// # #[cfg(target_arch = "wasm32")] mod demo {
    /// use spacetimedb::{table, ReducerContext, RangedIndex};
    ///
    /// #[table(name = user,
    ///     index(name = dogs_and_name, btree(columns = [dogs, name])))]
    /// struct User {
    ///     id: u32,
    ///     name: String,
    ///     dogs: u64
    /// }
    ///
    /// fn demo(ctx: &ReducerContext) {
    ///     let by_dogs_and_name: RangedIndex<_, (u64, String), _> = ctx.db.user().dogs_and_name();
    ///
    ///     // Delete users with exactly 25 dogs.
    ///     by_dogs_and_name.delete(25u64); // The `u64` is required, see below.
    ///
    ///     // Delete users with at least 25 dogs.
    ///     by_dogs_and_name.delete(25u64..);
    ///
    ///     // Delete users with exactly 25 dogs, and a name beginning with "J".
    ///     by_dogs_and_name.delete((25u64, "J".."K"));
    ///
    ///     // Delete users with exactly 25 dogs, and exactly the name "Joseph".
    ///     by_dogs_and_name.delete((25u64, "Joseph"));
    ///
    ///     // You can also pass arguments by reference if desired.
    ///     by_dogs_and_name.delete((&25u64, &"Joseph".to_string()));
    /// }
    /// # }
    /// ```
    ///
    /// **NOTE:** An unfortunate interaction between Rust's trait solver and integer literal defaulting rules means that you must specify the types of integer literals passed to `filter` and `find` methods via the suffix syntax, like `21u32`.
    ///
    /// If you don't, you'll see a compiler error like:
    /// > ```text
    /// > error[E0271]: type mismatch resolving `<i32 as FilterableValue>::Column == u32`
    /// >    --> modules/rust-wasm-test/src/lib.rs:356:48
    /// >     |
    /// > 356 |     for person in ctx.db.person().age().filter(21) {
    /// >     |                                         ------ ^^ expected `u32`, found `i32`
    /// >     |                                         |
    /// >     |                                         required by a bound introduced by this call
    /// >     |
    /// >     = note: required for `i32` to implement `IndexScanRangeBounds<(u32,), SingleBound>`
    /// > note: required by a bound in `RangedIndex::<Tbl, IndexType, Idx>::filter`
    /// >     |
    /// > 410 |     pub fn filter<B, K>(&self, b: B) -> impl Iterator<Item = Tbl::Row>
    /// >     |            ------ required by a bound in this associated function
    /// > 411 |     where
    /// > 412 |         B: IndexScanRangeBounds<IndexType, K>,
    /// >     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `RangedIndex::<Tbl, IndexType, Idx>::filter`
    /// > ```
    ///
    /// May panic if deleting any one of the rows would violate a constraint,
    /// though at present no such constraints exist.
    pub fn delete<B, K>(&self, b: B) -> u64
    where
        B: IndexScanRangeBounds<IndexType, K>,
    {
        let index_id = Idx::index_id();
        let args = b.get_args();
        let (prefix, prefix_elems, rstart, rend) = args.args_for_syscall();
        sys::datastore_delete_by_index_scan_range_bsatn(index_id, prefix, prefix_elems, rstart, rend)
            .unwrap_or_else(|e| panic!("unexpected error from `datastore_delete_by_index_scan_range_bsatn`: {e}"))
            .into()
    }
}

fn filter<Tbl, Idx, IndexType, B, K>(b: B) -> impl Iterator<Item = Tbl::Row>
where
    Tbl: Table,
    Idx: Index,
    B: IndexScanRangeBounds<IndexType, K>,
{
    let index_id = Idx::index_id();
    let args = b.get_args();
    let (prefix, prefix_elems, rstart, rend) = args.args_for_syscall();
    let iter = sys::datastore_index_scan_range_bsatn(index_id, prefix, prefix_elems, rstart, rend)
        .unwrap_or_else(|e| panic!("unexpected error from `datastore_index_scan_range_bsatn`: {e}"));
    TableIter::new(iter)
}

/// A read-only handle to a B-tree index.
///
/// This is the read-only version of [`RangedIndex`].
/// It mirrors [`RangedIndex`] but exposes only `.filter(..)`, not `.delete(..)`.
/// It is used by `{table}__ViewHandle` to keep view code read-only at compile time.
///
/// Note, the `Tbl` generic is the read-write table handle `{table}__TableHandle`.
/// This is because read-only indexes still need [`Table`] metadata.
/// The view handle itself deliberately does not implement `Table`.
pub struct RangedIndexReadOnly<Tbl: Table, IndexType, Idx: Index> {
    _marker: PhantomData<(Tbl, IndexType, Idx)>,
}

impl<Tbl: Table, IndexType, Idx: Index> RangedIndexReadOnly<Tbl, IndexType, Idx> {
    #[doc(hidden)]
    pub const __NEW: Self = Self { _marker: PhantomData };

    pub fn filter<B, K>(&self, b: B) -> impl Iterator<Item = Tbl::Row>
    where
        B: IndexScanRangeBounds<IndexType, K>,
    {
        filter::<Tbl, Idx, IndexType, B, K>(b)
    }
}

/// Trait used for overloading methods on [`RangedIndex`].
/// See [`RangedIndex`] for more information.
pub trait IndexScanRangeBounds<T, K = ()> {
    #[doc(hidden)]
    fn get_args(&self) -> IndexScanRangeArgs;
}

#[doc(hidden)]
/// Arguments to one of the ranged-index-scan-related host-/sys-calls.
///
/// All pointers passed into the syscall are packed into a single buffer, `data`,
/// with slices taken at the appropriate offsets, to save allocatons in WASM.
pub struct IndexScanRangeArgs {
    data: IterBuf,
    prefix_elems: usize,
    rstart_idx: usize,
    // None if rstart and rend are the same
    rend_idx: Option<usize>,
}

impl IndexScanRangeArgs {
    /// Get slices into `self.data` for the prefix, range start and range end.
    pub(crate) fn args_for_syscall(&self) -> (&[u8], ColId, &[u8], &[u8]) {
        let prefix = &self.data[..self.rstart_idx];
        let (rstart, rend) = if let Some(rend_idx) = self.rend_idx {
            (&self.data[self.rstart_idx..rend_idx], &self.data[rend_idx..])
        } else {
            let elem = &self.data[self.rstart_idx..];
            (elem, elem)
        };
        (prefix, ColId::from(self.prefix_elems), rstart, rend)
    }
}

// Implement `IndexScanRangeBounds` for all the different index column types
// and filter argument types we support.
macro_rules! impl_index_scan_range_bounds {
    // In the first pattern, we accept two Prolog-style lists of type variables,
    // the first of which we use for the column types in the index,
    // and the second for the arguments supplied to the filter function.
    // We do our "outer recursion" to visit the sublists of these two lists,
    // at each step implementing the trait for indexes of that many columns.
    //
    // There's also an "inner recursion" later on, which, given a fixed number of columns,
    // implements the trait with the arguments being all the prefixes of that list.
    (($ColTerminator:ident $(, $ColPrefix:ident)*), ($ArgTerminator:ident $(, $ArgPrefix:ident)*)) => {
        // Implement the trait for all arguments N-column indexes.
        // The "inner recursion" described above happens in here.
        impl_index_scan_range_bounds!(@inner_recursion (), ($ColTerminator $(, $ColPrefix)*), ($ArgTerminator $(, $ArgPrefix)*));

        // Recurse on the suffix of the two lists, to implement the trait for all arguments to (N - 1)-column indexes.
        impl_index_scan_range_bounds!(($($ColPrefix),*), ($($ArgPrefix),*));
    };
    // Base case for the previous "outer recursion."
    ((), ()) => {};

    // The recursive case for the inner loop.
    //
    // When we start this recursion, `$ColUnused` will be empty,
    // so we'll implement N-element queries on N-column indexes.
    // The next call will move one type name from `($ColTerminator, $ColPrefix)` into `$ColUnused`,
    // so we'll implement (N - 1)-element queries on N-column indexes.
    // And so on.
    (@inner_recursion ($($ColUnused:ident),*), ($ColTerminator:ident $(, $ColPrefix:ident)+), ($ArgTerminator:ident $(, $ArgPrefix:ident)+)) => {
        // Emit the actual `impl IndexScanRangeBounds` form for M-element queries on N-column indexes.
        impl_index_scan_range_bounds!(@emit_impl ($($ColUnused),*), ($ColTerminator $(,$ColPrefix)*), ($ArgTerminator $(, $ArgPrefix)*));
        // Recurse, to implement for (M - 1)-element queries on N-column indexes.
        impl_index_scan_range_bounds!(@inner_recursion ($($ColUnused,)* $ColTerminator), ($($ColPrefix),*), ($($ArgPrefix),*));
    };
    // Base case for the inner recursive loop, when there is only one column remaining.
    // Implement the trait for both single-element tuples of arguments,
    // and for an argument passed outside of a tuple.
    //
    // As in the following `@emit_impl` case:
    // - `$ColUnused` are the types of the ignored suffix of the indexed columns.
    // - `$ColTerminator` is the type of the queried indexed column,
    //   which may have a range supplied as its argument.
    // - `$ArgTerminator` is the type of the argument provided for the queried column.
    //   More precisely it is the "inner" type, like `i32` or `&str`,
    //   which may be wrapped in a range like `std::ops::Range<$ArgTerminator>`.
    // - `Term` (not a meta-variable) is the type of the range wrapped around the `$ArgTerminator`.
    (@inner_recursion ($($ColUnused:ident),*), ($ColTerminator:ident), ($ArgTerminator:ident)) => {
        // Implementation for one-element tuples: defer to the implementation for bare values.
        impl<
            $($ColUnused,)*
            $ColTerminator,
            Term: IndexScanRangeBoundsTerminator<Arg = $ArgTerminator>,
            $ArgTerminator: FilterableValue<Column = $ColTerminator>,
        > IndexScanRangeBounds<($ColTerminator, $($ColUnused,)*)> for (Term,) {
            fn get_args(&self) -> IndexScanRangeArgs {
                IndexScanRangeBounds::<($ColTerminator, $($ColUnused,)*), SingleBound>::get_args(&self.0)
            }
        }
        // Implementation for bare values: serialize the value as the terminating bounds.
        impl<
            $($ColUnused,)*
            $ColTerminator,
            Term: IndexScanRangeBoundsTerminator<Arg = $ArgTerminator>,
            $ArgTerminator: FilterableValue<Column = $ColTerminator>,
        > IndexScanRangeBounds<($ColTerminator, $($ColUnused,)*), SingleBound> for Term {
            fn get_args(&self) -> IndexScanRangeArgs {
                let mut data = IterBuf::take();
                let rend_idx = self.bounds().serialize_into(&mut data);
                IndexScanRangeArgs { data, prefix_elems: 0, rstart_idx: 0, rend_idx }
            }
        }
    };

    // - `$ColUnused` are the types of the ignored suffix of the indexed columns.
    // - `$ColTerminator` is the type of the last queried indexed column,
    //   which may have a range supplied as its argument.
    // - `$ColPrefix` are the types of the queried prefix of the indexed columns,
    //   which must have single values supplied as their arguments.
    // - `$ArgTerminator` is the type of the argument provided for the last queried column.
    //   More precisely it is the "inner" type, like `i32` or `&str`,
    //   which may be wrapped in a range like `std::ops::Range<$ArgTerminator>`.
    // - `Term` (not a meta-variable) is the type of the range wrapped around the `$ArgTerminator`.
    // - `$ArgPrefix` are the types of the arguments provided for the queried prefix columns.
    (@emit_impl ($($ColUnused:ident),*), ($ColTerminator:ident $(, $ColPrefix:ident)+), ($ArgTerminator:ident $(, $ArgPrefix:ident)+)) => {
        impl<
            $($ColUnused,)*
            $ColTerminator,
            $($ColPrefix,)*
            Term: IndexScanRangeBoundsTerminator<Arg = $ArgTerminator>,
            $ArgTerminator: FilterableValue<Column = $ColTerminator>,
            $($ArgPrefix: FilterableValue<Column = $ColPrefix>,)+
        > IndexScanRangeBounds<
            ($($ColPrefix,)+
             $ColTerminator,
             $($ColUnused,)*)
          > for ($($ArgPrefix,)+ Term,) {
            fn get_args(&self) -> IndexScanRangeArgs {
                let mut data = IterBuf::take();

                // Get the number of prefix elements.
                let prefix_elems = impl_index_scan_range_bounds!(@count $($ColPrefix)+);

                // Destructure the argument tuple into variables with the same names as their types.
                #[allow(non_snake_case)]
                let ($($ArgPrefix,)+ term,) = self;

                // For each prefix queried, zerialize it into the `data` buffer.
                Ok(())
                    $(.and_then(|()| data.serialize_into($ArgPrefix)))+
                    .unwrap();

                // Remember the separator between the prefix and the terminator,
                // so that we can slice them separately and pass them to the appropriate filter host call.
                let rstart_idx = data.len();

                // Serialize the terminating range,
                // and get the info required to separately slice the lower and upper bounds of that range
                // since the host call takes those as separate slices.
                let rend_idx = term.bounds().serialize_into(&mut data);
                IndexScanRangeArgs { data, prefix_elems, rstart_idx, rend_idx }
            }
        }
    };

    // Counts the number of elements in the tuple.
    (@count $($T:ident)*) => {
        0 $(+ impl_index_scan_range_bounds!(@drop $T 1))*
    };
    (@drop $a:tt $b:tt) => { $b };
}

pub struct SingleBound;

impl_index_scan_range_bounds!(
    (ColA, ColB, ColC, ColD, ColE, ColF),
    (ArgA, ArgB, ArgC, ArgD, ArgE, ArgF)
);

// Single-column indexes
// impl<T> IndexScanRangeBounds<(T,)> for Range<T> {}
// impl<T> IndexScanRangeBounds<(T,)> for T {}

// // Two-column indexes
// impl<T, U> IndexScanRangeBounds<(T, U)> for Range<T> {}
// impl<T, U> IndexScanRangeBounds<(T, U)> for T {}
// impl<T, U> IndexScanRangeBounds<(T, U)> for (T, Range<U>) {}
// impl<T, U> IndexScanRangeBounds<(T, U)> for (T, U) {}

// // Three-column indexes
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for Range<T> {}
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for T {}
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for (T, Range<U>) {}
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for (T, U) {}
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for (T, U, Range<V>) {}
// impl<T, U, V> IndexScanRangeBounds<(T, U, V)> for (T, U, V) {}

/// A trait for types that can have a sequence based on them.
/// This is used for auto-inc columns to determine if an insertion of a row
/// will require the column to be updated in the row.
pub trait SequenceTrigger: Sized {
    /// Is this value one that will trigger a sequence, if any,
    /// when used as a column value.
    /// For numeric types, this is `0`.
    fn is_sequence_trigger(&self) -> bool;
    /// Should invoke `BufReader::get_{Self}`, for example `BufReader::get_u32`.
    fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError>;
    /// Read a generated column from the slice, if this row was a sequence trigger.
    #[inline(always)]
    fn maybe_decode_into(&mut self, gen_cols: &mut &[u8]) {
        if self.is_sequence_trigger() {
            *self = Self::decode(gen_cols).unwrap_or_else(|_| sequence_decode_error())
        }
    }
}

#[cold]
#[inline(never)]
fn sequence_decode_error() -> ! {
    unreachable!("a row was a sequence trigger but there was no generated column for it.")
}

macro_rules! impl_seq_trigger {
    ($($get:ident($t:ty),)*) => {
        $(
            impl SequenceTrigger for $t {
                #[inline(always)]
                fn is_sequence_trigger(&self) -> bool { *self == 0 }
                #[inline(always)]
                fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
                    reader.$get()
                }
            }
        )*
    };
}

impl_seq_trigger!(
    get_u8(u8),
    get_i8(i8),
    get_u16(u16),
    get_i16(i16),
    get_u32(u32),
    get_i32(i32),
    get_u64(u64),
    get_i64(i64),
    get_u128(u128),
    get_i128(i128),
);

impl SequenceTrigger for crate::sats::i256 {
    #[inline(always)]
    fn is_sequence_trigger(&self) -> bool {
        *self == Self::ZERO
    }
    #[inline(always)]
    fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
        reader.get_i256()
    }
}

impl SequenceTrigger for crate::sats::u256 {
    #[inline(always)]
    fn is_sequence_trigger(&self) -> bool {
        *self == Self::ZERO
    }
    #[inline(always)]
    fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
        reader.get_u256()
    }
}

/// Insert a row of type `T` into the table identified by `table_id`.
#[track_caller]
fn insert<T: Table>(mut row: T::Row, mut buf: IterBuf) -> Result<T::Row, TryInsertError<T>> {
    let table_id = T::table_id();
    // Encode the row as bsatn into the buffer `buf`.
    buf.clear();
    buf.serialize_into(&row).unwrap();

    // Insert row into table.
    // When table has an auto-incrementing column, we must re-decode the changed `buf`.
    let res = sys::datastore_insert_bsatn(table_id, &mut buf).map(|gen_cols| {
        // Let the caller handle any generated columns written back by `sys::datastore_insert_bsatn` to `buf`.
        T::integrate_generated_columns(&mut row, gen_cols);
        row
    });
    res.map_err(|e| {
        let err = match e {
            sys::Errno::UNIQUE_ALREADY_EXISTS => {
                T::UniqueConstraintViolation::get().map(TryInsertError::UniqueConstraintViolation)
            }
            sys::Errno::AUTO_INC_OVERFLOW => T::AutoIncOverflow::get().map(TryInsertError::AutoIncOverflow),
            _ => None,
        };
        err.unwrap_or_else(|| panic!("unexpected insertion error: {e}"))
    })
}

/// Update a row of type `T` to `row` using the index identified by `index_id`.
#[track_caller]
fn update<T: Table>(index_id: IndexId, mut row: T::Row, mut buf: IterBuf) -> T::Row {
    let table_id = T::table_id();
    // Encode the row as bsatn into the buffer `buf`.
    buf.clear();
    buf.serialize_into(&row).unwrap();

    // Insert row into table.
    // When table has an auto-incrementing column, we must re-decode the changed `buf`.
    let res = sys::datastore_update_bsatn(table_id, index_id, &mut buf).map(|gen_cols| {
        // Let the caller handle any generated columns written back by `sys::datastore_update_bsatn` to `buf`.
        T::integrate_generated_columns(&mut row, gen_cols);
        row
    });

    // TODO(centril): introduce a `TryUpdateError`.
    res.unwrap_or_else(|e| panic!("unexpected update error: {e}"))
}

/// A table iterator which yields values of the `TableType` corresponding to the table.
struct TableIter<T: DeserializeOwned> {
    /// The underlying source of our `Buffer`s.
    inner: sys::RowIter,

    /// The current position in the buffer, from which `deserializer` can read.
    reader: Cursor<IterBuf>,

    _marker: PhantomData<T>,
}

impl<T: DeserializeOwned> TableIter<T> {
    #[inline]
    fn new(iter: sys::RowIter) -> Self {
        TableIter::new_with_buf(iter, IterBuf::take())
    }

    #[inline]
    fn new_with_buf(iter: sys::RowIter, mut buf: IterBuf) -> Self {
        buf.clear();
        TableIter {
            inner: iter,
            reader: Cursor::new(buf),
            _marker: PhantomData,
        }
    }

    fn is_exhausted(&self) -> bool {
        (&self.reader).remaining() == 0 && self.inner.is_exhausted()
    }
}

impl<T: DeserializeOwned> Iterator for TableIter<T> {
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            // If we currently have some bytes in the buffer to still decode, do that.
            if (&self.reader).remaining() > 0 {
                let row = bsatn::from_reader(&mut &self.reader).expect("Failed to decode row!");
                return Some(row);
            }

            // Don't fetch the next chunk if there is none.
            if self.inner.is_exhausted() {
                return None;
            }

            // Otherwise, try to fetch the next chunk while reusing the buffer.
            self.reader.buf.clear();
            self.reader.pos.set(0);
            self.inner.read(&mut self.reader.buf);
        }
    }
}
